Thread version_data through BundleInfo to worker-side bundle initialization#67217
Thread version_data through BundleInfo to worker-side bundle initialization#67217o-nikolas wants to merge 5 commits into
Conversation
ferruzzi
left a comment
There was a problem hiding this comment.
Do we want any unit tests? It's just plumbing a value through, so maybe not necessary. Feels pretty trivial on this PR so I'll approve but felt odd not calling it out.
97dc507 to
48a9b1b
Compare
…zation Add version_data to the push path so structured bundle metadata (e.g., S3 manifests) reaches workers at task execution time. Changes: - Add version_data field to BundleInfo (workloads/base.py) - Populate version_data from DagVersion in ExecuteTask.make() - Add selectinload(TI.dag_version) to scheduler enqueue query to avoid N+1 queries when reading version_data - Add version_data parameter to BaseDagBundle.__init__ (stored as self.version_data) and DagBundlesManager.get_bundle() - Pass version_data through task_runner.py and callback_supervisor.py - Regenerate task-sdk datamodels to include version_data in BundleInfo Existing bundles ignore version_data (defaults to None). The S3 bundle will use self.version_data in initialize() to fetch specific object versions (follow-up PR).
Address review feedback: - Use dict[str, Any] | None instead of bare dict | None for version_data in both BaseDagBundle.__init__ and BundleInfo - Add minimal tests verifying version_data plumbing through the bundle constructor
…stanceKey types The strict TypeError for unknown key types broke executor tests that pass Mock objects or raw tuples as keys (Lambda, Batch, ECS, Kubernetes). Restore the original fallback to CallbackState for any non-TaskInstanceKey, matching main's behavior.
…pat with 3.2.x The test_process_workloads_routes_execute_callback test uses CallbackKey(id=...) which requires the dataclass form introduced in 3.3. In Airflow 3.2.x, CallbackKey is a str type alias and does not accept keyword arguments. Change the skipif guard from AIRFLOW_V_3_2_PLUS to AIRFLOW_V_3_3_PLUS.
e5e65a1 to
a5bc3b2
Compare
|
@ashb and @amoghrajesh You might be interested in this one. |
| if not bundle_info: | ||
| version_data = None | ||
| if ti.dag_version is not None: | ||
| version_data = ti.dag_version.version_data |
There was a problem hiding this comment.
Why of this read off ti, but other things just below are ti.dag_model.bundle*
There was a problem hiding this comment.
Worth flagging that even off the same ti, version and version_data can disagree. Two cases:
- Unpinned runs (
disable_bundle_versioning=True):dag_run.bundle_versionisNone, butti.dag_version.version_datamay still carry a manifest -- soversion=None, version_data={...}. - After
_verify_integrity_if_dag_changed(scheduler_job_runner.py:2521-2530): TI'sdag_version_idis bumped to the latest version whiledag_run.bundle_versionis left untouched, soversion_datadescribes a newer version thanversionreports.
The scheduler picks a deliberate rule for bundle_version at scheduler_job_runner.py:1438-1442; worth deciding the equivalent rule for version_data here (e.g., is it valid to expose version_data when version is None?).
| if isinstance(key, CallbackKey): | ||
| return CallbackState | ||
| raise TypeError(f"Unknown workload key type: {type(key)!r}") | ||
| return CallbackState |
| ranked_query.c.map_index_for_ordering, | ||
| ) | ||
| .options(selectinload(TI.dag_model)) | ||
| .options(selectinload(TI.dag_version)) |
There was a problem hiding this comment.
Hmm, joins aren't free, and this isn't used for most places.
I'm wondering if this needs to be based on what the bundle backend needs somehow?
| version_data: | ||
| anyOf: | ||
| - additionalProperties: true | ||
| type: object | ||
| - type: 'null' |
There was a problem hiding this comment.
If the field is present it should be an object.
(I.e. its worker not sent, or its an {...} object )
|
|
||
|
|
||
| @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecuteCallback requires Airflow 3.2+") | ||
| @pytest.mark.skipif(not AIRFLOW_V_3_3_PLUS, reason="CallbackKey dataclass requires Airflow 3.3+") |
| name: str, | ||
| refresh_interval: int = conf.getint("dag_processor", "refresh_interval"), | ||
| version: str | None = None, | ||
| version_data: dict[str, Any] | None = None, |
There was a problem hiding this comment.
Worth updating docstring to add this param on L295
This is PR 2 of the S3 Dag Bundle versioning series. PR 1 (#66491) added the
BundleVersiondataclass, Alembic migration, and persistence path. This PR completes the worker-side plumbing so that version data reaches the bundle instance at task execution time.Adds
version_datatoBundleInfoand threads it through the worker-side bundle initialization path so that structured version metadata (e.g., S3 manifests) reaches the bundle at task execution time.Changes:
BundleInfogainsversion_data: dict | None = NonefieldExecuteTask.make()readsversion_datafromDagVersion(via eagerly-loaded relationship)selectinload(TI.dag_version)to avoid N+1 queriesBaseDagBundle.__init__accepts and storesversion_dataDagBundlesManager.get_bundle()passesversion_datato the bundle constructortask_runner.parse()andcallback_supervisorpassversion_datathrough_generated.pyupdated with the new fieldrelated: #66491
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4) following the guidelines